package com.hjs.homework

import java.sql.{Connection, DriverManager}

import org.apache.spark.sql.ForeachWriter

class MySQLWriter(url: String, user: String, pwd: String) extends ForeachWriter[BusInfo] {

  var conn: Connection = _

  override def open(partitionId: Long, epochId: Long): Boolean = {
    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection(url, user, pwd)
    true
  }

  override def process(value: BusInfo): Unit = {
    val lglat = value.lglat.split("_")
    val p = conn.prepareStatement("insert into bus_info (deployNum,lng,lat) values(?,?,?)")
    p.setString(1, value.deployNum)
    p.setString(2, lglat(0))
    p.setString(3, lglat(1))
    p.execute()
  }

  override def close(errorOrNull: Throwable): Unit = {
    conn.close()
  }
}
